/*
*	Copyright(c) 2020 lutianming email：641471957@qq.com
*
*	Sheeps may be copied only under the terms of the GNU Affero General Public License v3.0
*/

#include "framework.h"
#include "ServerTask.h"
#include "ServerCase.h"
#include "ServerSocksProxy.h"
#include "ServerStress.h"
#include "SheepsStruct.h"
#include "ServerHook.h"
#include <algorithm>
#include <stdio.h>
#include <thread>

#define SQL_SIZE 4096
int api_samples = 0;

int TaskIdIndex = 1;
std::list<int> *TaskIdPool = new std::list<int>;
std::mutex* TaskIdPoolLock = new std::mutex;


#define TaskCodeMaxLen 3
char task_code_len = 1;
std::vector<char>* task_code = new std::vector<char>;

std::map<int, hServerTaskConfig>* ServerTaskCfg = new(std::nothrow) std::map<int, hServerTaskConfig>;
std::mutex* ServerTaskCfgLock = new std::mutex;

/*task code manager*/
void task_code_init() {
	char y = 0;
	for (int i = 0; i < TaskCodeMaxLen; i++) {
		srand((unsigned int)NOWTIME + y);
		y = rand() % 26;
		task_code->push_back(y);
	}
	task_code_len = rand() % TaskCodeMaxLen + 1;
}

void task_code_get(char* buf) {
	int i;
	char v = 0;
	for (i = 0; i < task_code_len; i++) {
		v = (*task_code)[i];
		*(buf + i) = 'A' + v;
	}
	i--;
	v++;
	(*task_code)[i] = v;
	for (; i >= 0; i--) {
		v = (*task_code)[i];
		if (v < 26) break;
		else{
			(*task_code)[i] = 0;
			if (i != 0) {
				(*task_code)[i - 1]++;
			}
			else {
				task_code_len = (task_code_len + 1) % TaskCodeMaxLen;
				task_code_len = task_code_len == 0 ? TaskCodeMaxLen : task_code_len;
			}
		}
	}
}

/*taskid poll api*/
int task_id_pool_get_id() {
	TaskIdPoolLock->lock();
	if (TaskIdPool->size() < 4096) {
		TaskIdPoolLock->unlock();
		return TaskIdIndex++;
	}
	int task_id = TaskIdPool->front();
	TaskIdPool->pop_front();
	TaskIdPoolLock->unlock();
	return task_id;
}

void task_id_pool_push_back(int task_id) {
	TaskIdPoolLock->lock();
	TaskIdPool->push_back(task_id);
	TaskIdPoolLock->unlock();
}
/*taskid poll api*/


/*taskcfg api*/
hServerTaskConfig get_server_taskcfg_by_id(int task_id) {
	std::map<int, hServerTaskConfig>::iterator iter = ServerTaskCfg->find(task_id);
	if (iter != ServerTaskCfg->end())
		return iter->second;
	return NULL;
}

void insert_server_taskcfg(int task_id, hServerTaskConfig taskcfg) {
	ServerTaskCfgLock->lock();
	ServerTaskCfg->insert(std::pair<int, hServerTaskConfig>(task_id, taskcfg));
	ServerTaskCfgLock->unlock();
}

void delete_server_taskcfg(int task_id) {
	ServerTaskCfgLock->lock();
	ServerTaskCfg->erase(task_id);
	ServerTaskCfgLock->unlock();
}
/*taskcfg api*/

hServerTaskConfig task_report_timeout(int task_id, int run_number) {
	hServerTaskConfig task = get_server_taskcfg_by_id(task_id);
	if (task && task->run_number == run_number) {
		return task;
	}
	return NULL;
}

void task_push_over(hServerTaskRun taskrun, hServerTaskConfig taskcfg) //这里有问题，资源释放应该移动到任务工作线程
{
	if (taskcfg->taskState == STAGE_CLEAN) {
		taskrun->enforcestop = true;
		return;
	}
	if (taskcfg->taskState != STATE_RUNING) return;
	taskrun->stage = STAGE_CLEAN;
	taskcfg->taskState = STATE_BALANCE;

	char buf[64] = { 0x0 };
	int n = snprintf(buf + SHEEPS_HEAD_SIZE, sizeof(buf) - SHEEPS_HEAD_SIZE, "{\"TaskID\":%d}", taskcfg->taskID);
	t_stress_protocol_head* head = (t_stress_protocol_head*)buf;
	head->msgLen = n + SHEEPS_HEAD_SIZE;
	head->cmdNo = S2C_Task_Finish;

	std::map<HSOCKET, t_sheeps_agent*>* agents;
	std::map<HSOCKET, t_sheeps_agent*>::iterator iter_agent;
	t_project_config* projectcfg = ProjectConfig[taskcfg->projectID];
	std::map<std::string, t_agent_group*>::iterator iter_gropu = projectcfg->groups.find(taskcfg->groupID);
	if (iter_gropu != projectcfg->groups.end()) {
		agents = iter_gropu->second->agents;
		for (iter_agent = agents->begin(); iter_agent != agents->end(); ++iter_agent) {
			HsocketSend(iter_agent->first, buf, head->msgLen);
		}
	}
}

void task_user_dead(int time, hServerTaskConfig taskcfg, int machine_id, int user_number, int run_time, const char* errmsg){
	hServerTaskRun taskrun = taskcfg->taskrun;
	TaskReportError* error = &taskrun->error;
	if (*errmsg == '|') AtomicAddOneRet(&taskrun->run_success); else AtomicAddOneRet(&taskrun->run_failed);
	AtomicAddOneRet(&taskrun->UserDeadCount);
	const char* p = strchr(errmsg, '|');
	*(char*)p = 0x0;
	char buf[512] = { 0x0 };
	snprintf(buf, sizeof(buf), "%d|任务id:%d|机器id:%d|用户编号:%d|运行时长:%d秒|附加消息:%s|结束原因:%s", time, taskcfg->taskID, machine_id, user_number, run_time, p+1, errmsg);
	error->vecLock->lock();
	error->vecData->push_back(buf);
	error->vecLock->unlock();
}

void task_user_online_count(hServerTaskConfig taskcfg, int count){
	hServerTaskRun taskrun = taskcfg->taskrun;
	AtomicAddAndRet(&taskrun->UserOnlineCount, count);
}

void task_user_connect_result(hServerTaskConfig taskcfg, const char* addr , int success, int faile) {
	hServerTaskRun taskrun = taskcfg->taskrun;
	TaskReportConnect* connect = &taskrun->connect;
	int total = success + faile;
	AtomicAddAndRet(&connect->total, total);

	std::map<std::string, ReportInfoConnect*>* info = connect->info;
	std::map<std::string, ReportInfoConnect*>::iterator iter;
	TRY:
	iter = info->find(addr);
	if (iter != info->end()){
		ReportInfoConnect* report = iter->second;
		AtomicAddAndRet(&report->total, total);
		AtomicAddAndRet(&report->success, success);
		AtomicAddAndRet(&report->faile, faile);
	}
	else{
		ReportInfoConnect* report = (ReportInfoConnect*)malloc(sizeof(ReportInfoConnect));
		if (!report) return;
		memset(report, 0x0, sizeof(ReportInfoConnect));
		bool res = info->insert(std::make_pair(addr, report)).second;
		if (!res) {
			free(report);
			goto TRY;
		}
		AtomicAddAndRet(&report->total, total);
		AtomicAddAndRet(&report->success, success);
		AtomicAddAndRet(&report->faile, faile);
	}
}

void task_user_netflow(hServerTaskConfig taskcfg, const char* addr, int up, int down, int net_up, int net_down){
	hServerTaskRun taskrun = taskcfg->taskrun;
	TaskReportNetflow* net = &taskrun->netpack;

	AtomicAddAndRet(&net->total, up + down);

	std::map<std::string, ReportInfoNet*>* info = net->info;
	std::map<std::string, ReportInfoNet*>::iterator iter;
	TRY:
	iter = info->find(addr);
	if (iter != info->end()){
		ReportInfoNet* report = iter->second;
		AtomicAddAndRet(&report->up, up);
		AtomicAddAndRet(&report->down, down);
		AtomicAddAndRet(&report->net_up, net_up);
		AtomicAddAndRet(&report->net_down, net_down);
	}
	else{
		ReportInfoNet* report = (ReportInfoNet*)malloc(sizeof(ReportInfoNet));
		if (!report) return;
		memset(report, 0x0, sizeof(ReportInfoNet));
		bool res = info->insert(std::make_pair(addr, report)).second;
		if (!res) {
			free(report);
			goto TRY;
		}
		AtomicAddAndRet(&report->up, up);
		AtomicAddAndRet(&report->down, down);
		AtomicAddAndRet(&report->net_up, net_up);
		AtomicAddAndRet(&report->net_down, net_down);
	}
}

static inline int api_response_cmp(const void* x, const void* y){
	return *(int*)x - *(int*)y;
}

void task_user_api_sort(hServerTaskConfig taskcfg, hServerTaskRun taskrun) {
	std::map<std::string, ReportInfoApi*>* api = taskrun->api;
	std::map<std::string, ReportInfoApi*>::iterator iter;
	ReportInfoApi* info;
	std::vector<int>* vec;
	size_t size = 0;
	for (iter = api->begin(); iter != api->end(); iter++) {
		info = iter->second;
		if (info->success) {
			vec = info->vecData;
			size = vec->size();
			std::qsort(&(*vec)[0], vec->size(), sizeof(int), api_response_cmp);
			info->sorted = (long)size;
		}
	}
}

void task_user_api_insert_json(ReportInfoApi* info, cJSON* response_time) {
	if (response_time) {
		if (api_samples && info->success > api_samples)
			return;
		std::vector<int> vec;
		int len = cJSON_GetArraySize(response_time);
		for (int i = 0; i < len; i++) {
			char* value = cJSON_GetArrayItem(response_time, i)->valuestring;
			const char* p = value;
			int res_time = atoi(p);
			vec.push_back(res_time);
			while (1) {
				p = strchr(p, '|');
				if (!p) {
					break;
				}
				p++;
				res_time = atoi(p);
				vec.push_back(res_time);
			}
		}
		info->vecData->insert(info->vecData->end(), vec.begin(), vec.end());
		AtomicAddAndRet(&info->success, (long)vec.size());
	}
}

void task_user_api(hServerTaskConfig taskcfg, const char* api_name, int send_flow, int send_count, int recv_flow, int recv_count, int error_count, cJSON* response_time){
	hServerTaskRun taskrun = taskcfg->taskrun;
	std::map<std::string, ReportInfoApi*>* api = taskrun->api;
	std::map<std::string, ReportInfoApi*>::iterator iter;
	ReportInfoApi* info;
	TRY:
	iter = api->find(api_name);
	if (iter != api->end()) {
		info = iter->second;
		if (send_flow) AtomicAddAndRet64(&info->send_flow, send_flow);
		if (send_count) AtomicAddAndRet(&info->send_count, send_count);
		if (recv_flow) AtomicAddAndRet64(&info->recv_flow, recv_flow);
		if (recv_count) AtomicAddAndRet(&info->recv_count, recv_count);
		if (error_count) AtomicAddAndRet(&info->error, error_count);
		task_user_api_insert_json(info, response_time);
	}
	else {
		info = (ReportInfoApi*)malloc(sizeof(ReportInfoApi));
		if (!info)return;
		memset(info, 0x0, sizeof(ReportInfoApi));
		info->vecData = new std::vector<int>;
		bool res = api->insert(std::make_pair(api_name, info)).second;
		if (!res) {
			delete info->vecData;
			free(info);
			goto TRY;
		}
		if (send_flow) AtomicAddAndRet64(&info->send_flow, send_flow);
		if (send_count) AtomicAddAndRet(&info->send_count, send_count);
		if (recv_flow) AtomicAddAndRet64(&info->recv_flow, recv_flow);
		if (recv_count) AtomicAddAndRet(&info->recv_count, recv_count);
		if (error_count) AtomicAddAndRet(&info->error, error_count);
		task_user_api_insert_json(info, response_time);
	}
}

void task_user_counter(hServerTaskConfig taskcfg, const char* key, int val, int stype, int space_time){
	hServerTaskRun taskrun = taskcfg->taskrun;
	TaskReportCounter* cus = &taskrun->counter;

	std::map<std::string, ReportInfoCounter*>* info = cus->info;
	std::map<std::string, ReportInfoCounter*>::iterator iter;
	TRY:
	iter = info->find(key);
	if (iter != info->end()){
		ReportInfoCounter* report = iter->second;
		AtomicAddAndRet(&report->value, val);
	}
	else{
		ReportInfoCounter* report = (ReportInfoCounter*)malloc(sizeof(ReportInfoCounter));
		if (!report) return;
		memset(report, 0x0, sizeof(ReportInfoCounter));
		bool res = info->insert(std::pair<std::string, ReportInfoCounter*>(key, report)).second;
		if (!res) {
			free(report);
			goto TRY;
		}
		report->stype = stype;
		report->space_time = space_time;
		report->start_time = NOWTIME;
		AtomicAddAndRet(&report->value, val);
		
	}
}

static inline int task_add_once_user(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	int left_user = taskcfg->totalUser - taskrun->UserActiveCount;
	int count = left_user > taskcfg->onceUser ? taskcfg->onceUser : left_user;
	taskrun->UserActiveCount += count;
	return count;
}

static int task_init_record_sql(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	PushMessage* pushmsg = &taskrun->pushmsg;
	if (taskcfg->replayAddr->empty())
	{
		pushmsg->dbsql = (char*)malloc(1);
		*(pushmsg->dbsql) = 0x0;
		return -1;
	}

	pushmsg->dbsql = (char*)malloc(SQL_SIZE);
	if (pushmsg->dbsql == NULL)
	{
		LOG(slogid, LOG_ERROR, "%s:%d malloc error\r\n", __func__, __LINE__);
		return -1;
	}
	memset(pushmsg->dbsql, 0x0, SQL_SIZE);

	char temp[512] = { 0x0 };
	char table[128] = { 0x0 };
	std::list<Readdr*>::iterator iter = taskcfg->replayAddr->begin();
	for (; iter != taskcfg->replayAddr->end(); ++iter){
		addr_to_table((*iter)->srcAddr, table, sizeof(table));
		snprintf(temp, sizeof(temp), "select recordtime, event_type, protocol, ip, port, sessionid, content,note from \"%s\" where event_type < 3", table);
		if (pushmsg->sqllen == 0)
			pushmsg->sqllen += snprintf(pushmsg->dbsql, SQL_SIZE, "%s", temp);
		else
			pushmsg->sqllen += snprintf(pushmsg->dbsql + pushmsg->sqllen, size_t(SQL_SIZE) - pushmsg->sqllen, " union  %s", temp);
	}
	//snprintf(runtask->dbsql + sqllen, SQL_SIZE - sqllen, " order by recordtime limit %d,100", runtask->startRow);
	return 0;
}

static int task_client_count(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	std::map<HSOCKET, t_sheeps_agent*>* agents;
	std::map<HSOCKET, t_sheeps_agent*>::iterator iter_agent;
	t_project_config* projectcfg = ProjectConfig[taskcfg->projectID];
	std::map<std::string, t_agent_group*>::iterator iter_gropu = projectcfg->groups.find(taskcfg->groupID);
	if (iter_gropu != projectcfg->groups.end()) {
		agents = iter_gropu->second->agents;
		return int(agents->size());
	}
	return 0;
}

static void task_push_init(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	int client_count = task_client_count(taskrun, taskcfg);
	if (client_count == 0)
	{
		taskrun->stage = STAGE_CLEAN;
		return;
	}
	int user_index = taskrun->UserActiveCount;
	int once_user = taskcfg->loopMode == 2? task_add_once_user(taskrun, taskcfg): 0;
	int a = once_user / client_count;
	int b = once_user % client_count;

	char buf[512] = { 0x0 };
	char tem[256] = { 0x0 };

	const char* igerr = "false";
	if (taskcfg->loopMode == 0 && taskcfg->ignoreErr == true)
		igerr = "true";
	
	taskcfg->run_number++;
	snprintf(tem, sizeof(tem), "{\"TaskID\":%d,\"runNumber\":%d,\"UserExecuteCycle\":%d,\"projectID\":%d,\"IgnoreErr\":%s,\"LogLevel\":%d, \"LogReport\":%d,\"Parms\":\"%s\",", 
		taskcfg->taskID, taskcfg->run_number, taskcfg->userExecuteCycle, taskcfg->projectID, igerr, taskcfg->logLevel, taskcfg->report_log, taskcfg->parms1);
	int machine_id = 0;

	t_sheeps_agent* agent;
	std::map<HSOCKET, t_sheeps_agent*>* agents;
	std::map<HSOCKET, t_sheeps_agent*>::iterator iter_agent;
	t_project_config* projectcfg = ProjectConfig[taskcfg->projectID];
	std::map<std::string, t_agent_group*>::iterator iter_gropu = projectcfg->groups.find(taskcfg->groupID);
	if (iter_gropu != projectcfg->groups.end()) {
		agents = iter_gropu->second->agents;
		for (iter_agent = agents->begin(); iter_agent != agents->end(); ++iter_agent) {
			agent = iter_agent->second;
			if (agent->ready == AGENT_FAIL)
				continue;

			int user_count = a;
			if (b > 0){
				user_count++;
				b--;
			}
			memset(buf, 0, sizeof(buf));
			int n = snprintf(buf + SHEEPS_HEAD_SIZE, sizeof(buf) - SHEEPS_HEAD_SIZE, "%s\"MachineID\":%d,\"UserCount\":%d, \"UserIndex\":%d}", tem, machine_id++, user_count, user_index);
			t_stress_protocol_head* head = (t_stress_protocol_head*)buf;
			head->msgLen = n + SHEEPS_HEAD_SIZE;
			head->cmdNo = S2C_Task_Init;
			HsocketSend(iter_agent->first, buf, head->msgLen);
			user_index += user_count;
		}
	}

	if (once_user > 0)
		taskrun->LastPushUserTime = NOWTIME;
	taskrun->stage = STAGE_LOOP;

	PushMessage* pushmsg = &taskrun->pushmsg;
	pushmsg->tempMsg = new(std::nothrow) std::list<RecordMsg*>;
	if (pushmsg->tempMsg == NULL)
	{
		LOG(slogid, LOG_ERROR, "%s:%d malloc error\r\n", __func__, __LINE__);
		taskrun->stage = STAGE_CLEAN;
		return;
	}
	taskcfg->taskState = STATE_RUNING;
	char fullname[256] = { 0x0 };
	snprintf(fullname, sizeof(fullname), "%s/%s/%s", CasePath, taskcfg->caseDir, taskcfg->dbName);
	sqlite3_open(fullname, &pushmsg->dbConn);
	task_init_record_sql(taskrun, taskcfg);
}

static void task_push_add_once_user(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	t_sheeps_agent* agent;
	std::map<HSOCKET, t_sheeps_agent*>* agents;
	std::map<HSOCKET, t_sheeps_agent*>::iterator iter_agent;
	t_project_config* projectcfg = ProjectConfig[taskcfg->projectID];
	std::map<std::string, t_agent_group*>::iterator iter_gropu = projectcfg->groups.find(taskcfg->groupID);
	if (iter_gropu == projectcfg->groups.end() || iter_gropu->second->agents->size() <= 0)
		return;
	agents = iter_gropu->second->agents;

	time_t nowtime = NOWTIME;
	if (nowtime - taskrun->LastPushUserTime < taskcfg->spaceTime)
		return;

	int user_index = taskrun->UserActiveCount;
	int client_count = task_client_count(taskrun, taskcfg);
	int once_user = task_add_once_user(taskrun, taskcfg);
	if (client_count == 0 || once_user == 0)
		return;
	int a = once_user / client_count;
	int b = once_user % client_count;

	int user_count = 0;
	char buf[128] = { 0x0 };
	std::map<HSOCKET, h_SHEEPS_AGENT>::iterator iter = agents->begin();
	for (; iter != agents->end(); ++iter)
	{
		agent = iter->second;
		if (agent->ready == AGENT_FAIL)
			continue;
		user_count = a;
		if (b > 0){
			user_count++;
			b--;
		}
		if (b < 0){
			user_count--;
			b++;
		}
		memset(buf, 0, sizeof(buf));
		int n = snprintf(buf + SHEEPS_HEAD_SIZE, sizeof(buf) - SHEEPS_HEAD_SIZE, "{\"TaskID\":%d,\"Change\":%d, \"UserIndex\":%d}", taskcfg->taskID, user_count, user_index);
		t_stress_protocol_head* head = (t_stress_protocol_head*)buf;
		head->msgLen = n + SHEEPS_HEAD_SIZE;
		head->cmdNo = S2C_Task_Add_User;
		HsocketSend(iter->first, buf, head->msgLen);
		user_index += user_count;
	}
	taskrun->LastPushUserTime = nowtime;
}

static void task_stop_push_msg(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	PushMessage* pushmsg = &taskrun->pushmsg;
	pushmsg->stopPushRecord = true;
	sqlite3_close(pushmsg->dbConn);
	pushmsg->dbConn = NULL;

	char buf[512] = { 0x0 };
	int n = 0;
	if (taskcfg->loopMode == 0)
		n = snprintf(buf + SHEEPS_HEAD_SIZE, sizeof(buf) - SHEEPS_HEAD_SIZE, "{\"TaskID\":%d,\"Loop\":true}", taskcfg->taskID);
	else
		n = snprintf(buf + SHEEPS_HEAD_SIZE, sizeof(buf) - SHEEPS_HEAD_SIZE, "{\"TaskID\":%d,\"Loop\":false}", taskcfg->taskID);
	t_stress_protocol_head* head = (t_stress_protocol_head*)buf;
	head->msgLen = n + SHEEPS_HEAD_SIZE;
	head->cmdNo = S2C_Task_Stop_Case;

	std::map<HSOCKET, t_sheeps_agent*>* agents;
	std::map<HSOCKET, t_sheeps_agent*>::iterator iter_agent;
	t_project_config* projectcfg = ProjectConfig[taskcfg->projectID];
	std::map<std::string, t_agent_group*>::iterator iter_gropu = projectcfg->groups.find(taskcfg->groupID);
	if (iter_gropu == projectcfg->groups.end() || iter_gropu->second->agents->size() <= 0)
		return;
	agents = iter_gropu->second->agents;
	std::map<HSOCKET, h_SHEEPS_AGENT>::iterator iter = agents->begin();
	for (; iter != agents->end(); ++iter){
		HsocketSend(iter->first, buf, head->msgLen);
	}
}

static void task_get_record_msg(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	PushMessage* pushmsg = &taskrun->pushmsg;
	int rowcount = 0;
	if (pushmsg->sqllen != 0)
	{
		snprintf(pushmsg->dbsql + pushmsg->sqllen, size_t(SQL_SIZE) - pushmsg->sqllen, " order by recordtime limit %d,100", pushmsg->startRow);

		sqlite3_stmt* stmt = NULL;
		int result = sqlite3_prepare_v2(pushmsg->dbConn, pushmsg->dbsql, -1, &stmt, NULL);
		if (result == SQLITE_OK)
		{
			while (sqlite3_step(stmt) == SQLITE_ROW)
			{
				rowcount++;
				RecordMsg* msg = (RecordMsg*)malloc(sizeof(RecordMsg));
				if (msg == NULL)
				{
					LOG(slogid, LOG_ERROR, "%s:%d malloc error\r\n", __func__, __LINE__);
					continue;
				}
				memset(msg, 0x0, sizeof(RecordMsg));
				msg->record_time = sqlite3_column_int64(stmt, 0);
				msg->event = sqlite3_column_int(stmt, 1);
				msg->protocol = sqlite3_column_int(stmt, 2);
				const char* ip = (const char*)sqlite3_column_text(stmt, 3);
				snprintf(msg->ip, sizeof(msg->ip), "%s", ip);
				msg->port = sqlite3_column_int(stmt, 4);
				msg->sessionid = sqlite3_column_int64(stmt, 5);
				const char* content = (const char*)sqlite3_column_text(stmt, 6);
				msg->content_len = (int)strlen(content);
				msg->content = (char*)malloc(msg->content_len + 1);
				if (msg->content == NULL)
				{
					LOG(slogid, LOG_ERROR, "%s:%d malloc error\r\n", __func__, __LINE__);
					free(msg);
					continue;
				}
				memcpy(msg->content, content, msg->content_len +1);

				const char* note = (const char*)sqlite3_column_text(stmt, 7);
				msg->note_len = (int)strlen(note);
				msg->note = (char*)malloc(msg->note_len+1);
				if (msg->note == NULL) {
					LOG(slogid, LOG_ERROR, "%s:%d malloc error\r\n", __func__, __LINE__);
					free(msg->content);
					free(msg);
					continue;
				}
				memcpy(msg->note, note, msg->note_len + 1);

				pushmsg->tempMsg->push_back(msg);
			}
		}
		sqlite3_finalize(stmt);
		pushmsg->startRow += rowcount;
	}
	
	if (rowcount == 0 && taskcfg->loopMode != 2)
	{
		task_stop_push_msg(taskrun, taskcfg);
	}
}

static void task_chang_record_msg_addr(hServerTaskRun taskrun, hServerTaskConfig taskcfg, RecordMsg* msg)
{
	char chang[48] = { 0x0 };
	snprintf(chang, sizeof(chang), "%s:%d", msg->ip, msg->port);
	std::map<std::string, Readdr*>::iterator iter = taskcfg->changeAddr->find(chang);
	if (iter != taskcfg->changeAddr->end())
	{
		Readdr* dst = iter->second;
		char* p = strstr(dst->dstAddr, ":");
		if (p)
		{
			memset(msg->ip, 0x0, sizeof(msg->ip));
			memcpy(msg->ip, dst->dstAddr, p - dst->dstAddr);
			msg->port = atoi(p + 1);
		}
	}
}

static void task_push_record_msg_to_client(hServerTaskRun taskrun, hServerTaskConfig taskcfg, RecordMsg* msg)
{
	task_chang_record_msg_addr(taskrun, taskcfg, msg);

	size_t alloc_len = msg->content_len + msg->note_len + 256;
	char* buf = (char*)malloc(alloc_len);
	if (buf == NULL){
		LOG(slogid, LOG_ERROR, "%s:%d malloc error\r\n", __func__, __LINE__);
		return;
	}
	int n = 0;
	n = snprintf(buf + SHEEPS_HEAD_SIZE, alloc_len - SHEEPS_HEAD_SIZE, "{\"TaskID\":%d,\"EventType\":%d,\"ConnType\":%d,\"Host\":\"%s\",\"Port\":%d,\"ShortKey\":%lld,\"Timestamp\":%lld,\"Microsecond\":%lld,\"Msg\":\"%s\",\"Note\":\"%s\"}",
		taskcfg->taskID, msg->event, msg->protocol, msg->ip, msg->port, msg->sessionid, msg->record_time / 1000000, msg->record_time % 1000000, msg->content, msg->note);
	t_stress_protocol_head* head = (t_stress_protocol_head*)buf;
	head->msgLen = n + SHEEPS_HEAD_SIZE;
	head->cmdNo = S2C_Task_Add_Case;
	head->binary = 0;

	// cJSON* root = cJSON_CreateObject();
	// cJSON_AddNumberToObject(root, "TaskID", taskcfg->taskID);
	// cJSON_AddNumberToObject(root, "EventType", msg->event);
	// cJSON_AddNumberToObject(root, "ConnType", msg->protocol);
	// cJSON_AddStringToObject(root, "Host", msg->ip);
	// cJSON_AddNumberToObject(root, "Port", msg->port);
	// cJSON_AddNumber64ToObject(root, "ShortKey", msg->sessionid);
	// cJSON_AddNumber64ToObject(root, "Timestamp", msg->record_time / 1000000);
	// cJSON_AddNumberToObject(root, "Microsecond", msg->record_time % 1000000);
	// cJSON_AddStringLenToObject(root, "Msg", msg->content, msg->content_len);
	// cJSON_AddStringLenToObject(root, "Note", msg->note, msg->note_len);

	// int size = 512;
	// int offset = SHEEPS_HEAD_SIZE;
	// char* buf = (char*)cJSON_malloc(size);
	// buf = cJSON_PrintBinary(root, buf, &size, &offset);
	// t_stress_protocol_head* head = (t_stress_protocol_head*)buf;
	// head->msgLen = offset + SHEEPS_HEAD_SIZE;
	// head->cmdNo = S2C_Task_Add_Case;
	// head->binary = 1;

	t_sheeps_agent* agent;
	std::map<HSOCKET, t_sheeps_agent*>* agents;
	std::map<HSOCKET, t_sheeps_agent*>::iterator iter_agent;
	t_project_config* projectcfg = ProjectConfig[taskcfg->projectID];
	std::map<std::string, t_agent_group*>::iterator iter_gropu = projectcfg->groups.find(taskcfg->groupID);
	if (iter_gropu == projectcfg->groups.end() || iter_gropu->second->agents->size() <= 0)
		return;
	agents = iter_gropu->second->agents;
	std::map<HSOCKET, h_SHEEPS_AGENT>::iterator iter = agents->begin();
	for (; iter != agents->end(); ++iter)
	{
		agent = iter->second;
		if (agent->ready == AGENT_FAIL)
			continue;
		HsocketSend(iter->first, buf, head->msgLen);
	}
	free(buf);
	// cJSON_free(buf);
	// cJSON_Delete(root);
}

static void task_push_record_msg(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	PushMessage* pushmsg = &taskrun->pushmsg;
	if (pushmsg->stopPushRecord)
		return;
	if (pushmsg->tempMsg->empty())
		task_get_record_msg(taskrun, taskcfg);
	if (pushmsg->tempMsg->empty())
		return;

	RecordMsg* msg = NULL;
	for (int i = 0; i < 20; i++) {
		if (pushmsg->tempMsg->empty())
			break;
		msg = pushmsg->tempMsg->front();
		if (pushmsg->startReal == 0){
			pushmsg->startReal = GetSysTimeMicros();
			pushmsg->startRecord = msg->record_time;
		}
		if (taskcfg->loopMode == 2) {
			time_t shouldtime = msg->record_time - pushmsg->startRecord;
			time_t realtime = GetSysTimeMicros() - pushmsg->startReal;
			if (shouldtime > realtime)
				return;
		}
		task_push_record_msg_to_client(taskrun, taskcfg, msg);

		taskrun->case_time_len = (long)((msg->record_time - pushmsg->startRecord) * 0.000001);

		free(msg->note);
		free(msg->content);
		free(msg);
		pushmsg->tempMsg->pop_front();
	}
}

static void task_alive_user_reprot(hServerTaskRun taskrun, hServerTaskConfig taskcfg, TaskReportOnline* online, long long nowtime, bool flag) {
	//int left_user = taskcfg->totalUser - taskrun->UserActiveCount;
	int alive_user = taskrun->UserActiveCount - taskrun->UserDeadCount;

	if (alive_user == 0) {
		if (taskrun->stage == STAGE_LOOP && taskrun->UserDeadCount == taskcfg->totalUser) task_push_over(taskrun, taskcfg);
		else if (taskrun->stage == STAGE_CLEAN) taskrun->stage = STAGE_OVER;

		if (online->zero_count == 10 && !flag)
			return;
		if (taskrun->UserDeadCount == taskcfg->totalUser)
			online->zero_count++;
	}
	else {
		online->zero_count = 0;
	}

	char buf[256] = { 0x0 };
	snprintf(buf, sizeof(buf), "%lld|%d|%ld|%d|%ld|%ld|%ld", nowtime, taskcfg->totalUser, taskrun->UserActiveCount, alive_user, taskrun->UserOnlineCount, taskrun->run_success, taskrun->run_failed);
	online->vecData->push_back(buf);
}

static void task_connect_user_report(hServerTaskRun taskrun, hServerTaskConfig taskcfg, TaskReportConnect* connect, long long nowtime) {
	long total = AtomicRetAndSwap(&connect->total, 0);
	if (total == 0) {
		return;
	}

	std::map<std::string, ReportInfoConnect*>* info = connect->info;
	std::map<std::string, ReportInfoConnect*>::iterator iter;
	char buf[256] = { 0x0 };
	for (iter = info->begin(); iter != info->end(); ++iter) //风险操作，此时vector 元素可能被agent服务线程修改
	{
		ReportInfoConnect* report = iter->second;
		//long total = AtomicRetAndSwap(&report->total, 0);
		long success = AtomicRetAndSwap(&report->success, 0);
		long fail = AtomicRetAndSwap(&report->faile, 0);

		snprintf(buf, sizeof(buf), "%lld|%s|%ld|%ld", nowtime, iter->first.c_str(), success, fail);
		connect->vecData->push_back(buf);
	}
}

static void task_netflow_user_report(hServerTaskRun taskrun, hServerTaskConfig taskcfg, TaskReportNetflow* net, long long nowtime, bool flag) {
	long total = AtomicRetAndSwap(&net->total, 0);
	if (total == 0) {
		if (net->zero_count == 10 && !flag)
			return;
		net->zero_count++;
	}
	else {
		net->zero_count = 0;
	}

	std::map<std::string, ReportInfoNet*>* info = net->info;
	std::map<std::string, ReportInfoNet*>::iterator iter;
	char buf[256] = { 0x0 };
	for (iter = info->begin(); iter != info->end(); ++iter)  //风险操作，此时vector 元素可能被agent服务线程修改
	{
		ReportInfoNet* report = iter->second;
		long up = AtomicRetAndSwap(&report->up, 0);
		long down = AtomicRetAndSwap(&report->down, 0);
		long net_up = AtomicRetAndSwap(&report->net_up, 0);
		long net_down = AtomicRetAndSwap(&report->net_down, 0);

		snprintf(buf, sizeof(buf), "%lld|%s|%ld|%ld|%ld|%ld", nowtime, iter->first.c_str(), up, down, net_up, net_down);
		net->vecData->push_back(buf);
	}
}

static void task_custom_user_report(hServerTaskRun taskrun, hServerTaskConfig taskcfg, TaskReportCounter* cus, long long nowtime, bool flag) {
	std::map<std::string, ReportInfoCounter*>* info = cus->info;
	std::map<std::string, ReportInfoCounter*>::iterator iter;
	char buf[256] = { 0x0 };
	for (iter = info->begin(); iter != info->end(); ++iter)  //风险操作，此时vector 元素可能被agent服务线程修改
	{
		ReportInfoCounter* report = iter->second;
		if (nowtime - report->last_time >= report->space_time || flag) {
			switch (report->stype)
			{
			case PER_SUM: {
				long val = AtomicRetAndSwap(&report->value, 0);
				snprintf(buf, sizeof(buf), "%lld|%s|%ld", nowtime, iter->first.c_str(), val);
				break;
			};
			case PER_AVG: {
				long val = AtomicRetAndSwap(&report->value, 0);
				snprintf(buf, sizeof(buf), "%lld|%s|%ld", nowtime, iter->first.c_str(), val/report->space_time);
				break;
			}
			case HIS_SUM : {
				snprintf(buf, sizeof(buf), "%lld|%s|%ld", nowtime, iter->first.c_str(), report->value);
				break;
			};
			case HIS_AVG: {
				long time_legth = long(nowtime - report->start_time);
				snprintf(buf, sizeof(buf), "%lld|%s|%ld", nowtime, iter->first.c_str(), report->value/time_legth);
				break;
			}
			default:
				break;
			}
			cus->vecData->push_back(buf);
			report->last_time = (int)nowtime;
		}
	}
}

static void task_report_write(hServerTaskRun taskrun, hServerTaskConfig taskcfg, bool flag) {
	time_t nowtime = NOWTIME;
	TaskReportOnline* online = &taskrun->online;
	if (flag || nowtime - online->last_report_write >= 5) {
		online->last_report_write = nowtime;
		task_alive_user_reprot(taskrun, taskcfg, online, nowtime, flag);
	}
	
	TaskReportConnect* connect = &taskrun->connect;
	if (flag || nowtime - connect->last_report_write >= 1) {
		connect->last_report_write = nowtime;
		task_connect_user_report(taskrun, taskcfg, connect, nowtime);
	}
	
	TaskReportNetflow* net = &taskrun->netpack;
	if (flag || nowtime - net->last_report_write >= 5) {
		net->last_report_write = nowtime;
		task_netflow_user_report(taskrun, taskcfg, net, nowtime, flag);
	}

	TaskReportCounter* cus = &taskrun->counter;
	task_custom_user_report(taskrun, taskcfg, cus, nowtime, flag);
}

static void task_push_message(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	//while (taskrun->pushmsg.stopPushRecord == false && taskcfg->loopMode != 2) {
	//	task_push_record_msg(taskrun, taskcfg);
	//	TimeSleep(5);
	//}

	task_report_write(taskrun, taskcfg, false);
	if (taskrun->stage == STAGE_LOOP){
		task_push_record_msg(taskrun, taskcfg);
		task_push_add_once_user(taskrun, taskcfg);
	}
}

static void task_runing(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	switch (taskrun->stage)
	{
	case STAGE_INIT:
		task_push_init(taskrun, taskcfg);
		break;
	case STAGE_LOOP:
		task_push_message(taskrun, taskcfg);
		break;
	case STAGE_CLEAN:
		task_report_write(taskrun, taskcfg, false);
		break;
	default:
		break;
	}
}

static void taskrun_clear_message(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	PushMessage* pushmsg = &taskrun->pushmsg;
	while (pushmsg->tempMsg && pushmsg->tempMsg->size())
	{
		RecordMsg* msg = pushmsg->tempMsg->front();
		free(msg->content);
		free(msg);
		pushmsg->tempMsg->pop_front();
	}
	if (pushmsg->tempMsg)
		delete pushmsg->tempMsg;

	if (pushmsg->dbsql)
		free(pushmsg->dbsql);
	if (pushmsg->dbConn)
		sqlite3_close(pushmsg->dbConn);
}

/*static void taskrun_clear_logfd(hServerTaskRun taskrun, hServerTaskConfig taskcfg)
{
	CloseLog(taskrun->online.report_fd);
	CloseLog(taskrun->error.report_fd);
	CloseLog(taskrun->connect.report_fd);
	CloseLog(taskrun->netpack.report_fd);
}*/

void taskrun_clear(hServerTaskRun taskrun) {
	taskrun->error.vecData->clear();
	delete taskrun->error.vecData;
	delete taskrun->error.vecLock;

	taskrun->online.vecData->clear();
	delete taskrun->online.vecData;

	std::map<std::string, ReportInfoConnect*>* conninfo = taskrun->connect.info;
	std::map<std::string, ReportInfoConnect*>::iterator iter;
	for (iter = conninfo->begin(); iter != conninfo->end();){
		ReportInfoConnect* cinfo = iter->second;
		free(cinfo);
		conninfo->erase(iter++);
	}
	delete conninfo;
	delete taskrun->connect.vecData;

	std::map<std::string, ReportInfoNet*>* netinfo = taskrun->netpack.info;
	std::map<std::string, ReportInfoNet*>::iterator ite;
	for (ite = netinfo->begin(); ite != netinfo->end();){
		ReportInfoNet* ninfo = ite->second;
		free(ninfo);
		netinfo->erase(ite++);
	}
	delete netinfo;
	delete taskrun->netpack.vecData;

	std::map<std::string, ReportInfoApi*>* api = taskrun->api;
	std::map<std::string, ReportInfoApi*>::iterator itr;
	for (itr = api->begin(); itr != api->end(); ) {
		ReportInfoApi* info = itr->second;
		info->vecData->clear();
		delete info->vecData;
		free(info);
		api->erase(itr++);
	}
	delete api;

	std::map<std::string, ReportInfoCounter*>* cusinfo = taskrun->counter.info;
	std::map<std::string, ReportInfoCounter*>::iterator it;
	for (it = cusinfo->begin(); it != cusinfo->end();){
		ReportInfoCounter* ninfo = it->second;
		free(ninfo);
		cusinfo->erase(it++);
	}
	delete cusinfo;
	delete taskrun->counter.vecData;
}

static hServerTaskRun taskrun_init(hServerTaskConfig taskcfg)  //报表数据有缓存，有的话先清理
{
	hServerTaskRun taskrun;
	if (taskcfg->taskrun) 
	{
		taskrun = taskcfg->taskrun;
		taskrun_clear(taskrun);
	}
	else {
		taskrun = (hServerTaskRun)malloc(sizeof(ServerTaskRun));
	}
	if (taskrun){
		memset(taskrun, 0, sizeof(ServerTaskRun));
		taskcfg->taskrun = taskrun;
		taskrun->error.vecData = new(std::nothrow) std::vector<std::string>;
		taskrun->error.vecLock = new(std::nothrow) std::mutex;

		taskrun->online.vecData = new(std::nothrow) std::vector<std::string>;

		taskrun->connect.info = new(std::nothrow) std::map<std::string, ReportInfoConnect*>;
		taskrun->connect.vecData = new(std::nothrow) std::vector<std::string>;

		taskrun->netpack.info = new(std::nothrow) std::map<std::string, ReportInfoNet*>;
		taskrun->netpack.vecData = new(std::nothrow) std::vector<std::string>;

		taskrun->api = new(std::nothrow) std::map<std::string, ReportInfoApi*>;

		taskrun->counter.info = new(std::nothrow) std::map<std::string, ReportInfoCounter*>;
		taskrun->counter.vecData = new(std::nothrow) std::vector<std::string>;
	}
	return taskrun;
}

static void taskrun_init_log(hServerTaskConfig taskcfg, hServerTaskRun taskrun) {
	char path[256] = { 0x0 };
	snprintf(path, sizeof(path), "%s%ctasklog%c%d%ctask%d", EXE_Path, DIRChar, DIRChar, taskcfg->taskID, DIRChar, taskcfg->taskID);

	char command[256] = { 0x0 };
#ifdef __WINDOWS__
	snprintf(command, sizeof(command), "DEL /q %s* 2>nul", path);
	system(command);
#else
	snprintf(command, sizeof(command), "rm -f %s*", path);
	system(command);
#endif // __WINDOWS__
	if (taskcfg->report_log) {
		taskrun->report_log = task_client_count(taskrun, taskcfg);
	}
}

#ifdef __WINDOWS__
DWORD WINAPI task_runing_thread(LPVOID pParam)
{
#else
#include <sys/prctl.h>
int task_runing_thread(void* pParam)
{
	prctl(PR_SET_NAME,"task_s");
#endif // __WINDOWS__
	hServerTaskConfig taskcfg = (hServerTaskConfig)pParam;
	hServerTaskRun taskrun = taskrun_init(taskcfg);
	if (!taskrun) return -1;
	taskrun_init_log(taskcfg, taskrun);
	taskrun->StartTime = NOWTIME;
	while (taskrun->stage < STAGE_OVER){
		if (taskrun->enforcestop) break;
		task_runing(taskrun, taskcfg);
		TimeSleep(10);
	}
	//task_report_write(taskrun, taskcfg, true);
	taskrun_clear_message(taskrun, taskcfg);
	task_user_api_sort(taskcfg, taskrun);
	while (taskrun->report_log){
		TimeSleep(1000);
	}
	taskcfg->taskState = STATE_STOP;
	PostEvent(server_hook, taskcfg, [](BaseWorker* work, void* taskcfg) {
		((ServerHookProtocol*)server_hook)->event_task((ServerTaskConfig*)taskcfg, "task_stop");
		});
	return 0;
}

void task_run(hServerTaskConfig taskcfg){
	PostEvent(server_hook, taskcfg, [](BaseWorker* work, void* taskcfg) {
		((ServerHookProtocol*)server_hook)->event_task((ServerTaskConfig*)taskcfg, "task_start");
		std::thread th(task_runing_thread, taskcfg);
		th.detach();
		});
}